-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[redis-rs][core] Move connection refresh to the background #2915
base: main
Are you sure you want to change the base?
Conversation
31cf02f
to
f850d99
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a substantial change in the core mechanics. It must include tests that fail w/o this change to prove it necessity
instead of splitting your state between multiple maps and leaving room for error in reading the wrong map, consider just use an enum representing each connection state (Usable, Reconnecting, etc.) in the main connection map. See
|
ffbd018
to
30b35f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First comments (stopped before fn update_refreshed_connection).
Please see all inline comments and see how it behaves with cases where refresh connections is being called only with management/user/both. Also, maybe I haven't got to it yet - but the refresh task should be bounded to the lifetime of the clusterNode / address, so it would be cancelled when refresh_slots removes it from the topology
glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Outdated
Show resolved
Hide resolved
glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Outdated
Show resolved
Hide resolved
glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Outdated
Show resolved
Hide resolved
@@ -1798,9 +1858,8 @@ where | |||
if !failed_connections.is_empty() { | |||
Self::refresh_connections( | |||
inner, | |||
failed_connections, | |||
failed_connections.into_iter().collect::<HashSet<String>>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it creates an unnecessary copy. instead you can change failed_connections to be a set to begin with
40744cb
to
4e6535f
Compare
6ad1fd7
to
66a3e39
Compare
a8588c1
to
f51b647
Compare
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
This reverts commit 4e6535f. Signed-off-by: GilboaAWS <[email protected]>
…eturned the refresh_connection logic of sending the connection but without removing it from the connection_map Signed-off-by: GilboaAWS <[email protected]>
… the tokio refresh task Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
…tion before redirecting to a random node Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
…ot needed in rust where tokio runs the synced poll_flush to the end and only then let the refresh task to update the connection map Signed-off-by: GilboaAWS <[email protected]>
…sk itself. Signed-off-by: GilboaAWS <[email protected]>
f51b647
to
4799974
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments inline
trace!("refresh_and_update_connections: calling trigger_refresh_connection_tasks"); | ||
Self::trigger_refresh_connection_tasks( | ||
inner.clone(), | ||
addresses.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you clone it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the rust compiler demands it.
.read() | ||
.expect(MUTEX_READ_ERR) | ||
.refresh_conn_state | ||
.collect_refresh_notifiers(&addresses); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't trigger_refresh_connection_tasks return the notifiers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then you won't need to clone the addresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cost of cloning the addresses isn't a problem, especially the fact it happens in the slow path.
I think it's more clear to collect them with a dedicated function where needed, as we can see it's being used only here.
@@ -1371,62 +1374,147 @@ where | |||
} | |||
} | |||
|
|||
async fn refresh_connections( | |||
// Creates refresh tasks, await on the tasks' notifier and the update the connection_container. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"...and the update the connection_container."
It doesn't update the connection container. the description of this function isn't clear - please fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're 100% right.
let futures: Vec<_> = refresh_task_notifiers | ||
.iter() | ||
.map(|notify| notify.notified()) | ||
.collect(); | ||
futures::future::join_all(futures).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can skip the memory allocation you get from 'collect' with:
join_all(refresh_task_notifiers.iter().map(|notify| notify.notified())).await;
.refresh_address_in_progress | ||
.contains_key(&address) | ||
{ | ||
info!("Skipping refresh for {}: already in progress", address); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be changed to debug! as we can get a lot of this log when a node will be down and we'll have reconnection retries
RefreshConnectionType::AllConnections, | ||
None, | ||
core.glide_connection_options.clone(), | ||
false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because when sending RefreshConnectionType::AllConnections, there is no need to to reuse old connection...
} | ||
}; | ||
|
||
let mut conn_option = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let conn_option = if let Some(refresh_notifier) = reconnect_notifier {
...
} else {
None
};
None, | ||
ConnectionCheck::OnlyAddress(address) => { | ||
// No connection for this address in the conn_map | ||
Self::trigger_refresh_connection_tasks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again - can trigger_refresh_connection_tasks simply return the notifiers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then it would reduce a lot of logic (lines 2423 and above)
.get_node() | ||
.await; | ||
|
||
let reconnect_notifier: Option<Arc<Notify>> = match core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reduce redundant code lines:
let reconnect_notifier = core
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_address_in_progress
.get(&address).map(|refresh_task_state| {
if let RefreshTaskStatus::Reconnecting(refresh_notifier) = &refresh_task_state.status {
Some(refresh_notifier.get_notifier())
} else {
None
}});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore if you change trigger_refresh_connection_tasks to return the notifiers
@@ -66,6 +66,7 @@ async def test_update_connection_password( | |||
assert value == b"test_value" | |||
await glide_client.update_connection_password(None) | |||
await kill_connections(management_client) | |||
await asyncio.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to make sure the connections were already established.
Signed-off-by: GilboaAWS <[email protected]>
Issue link
This Pull Request is linked to issue (URL): [#2910]
Checklist
Before submitting the PR make sure the following are checked: